package com.bigdata.wsr.deserialize;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


/**
 * A JSON format implementation of {@link DebeziumDeserializationSchema} which deserializes the
 * received {@link SourceRecord} to JSON String.
 * <p>
 * 解决：
 * 1、cdc接入date类型、datetime类型的字段，会将数据转换成数值型 问题。
 * 2、cdc接入decimal类型的字段，异常输出问题。
 *
 * @author rui.wang
 * @date 2022/11/03
 */
@Slf4j
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient JsonConverter jsonConverter;

    /**
     * Configuration whether to enable {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
     * schema in messages.
     */
    private final Boolean includeSchema;

    /**
     * The custom configurations for {@link JsonConverter}.
     */
    private Map<String, Object> customConverterConfigs;

    public JsonDebeziumDeserializationSchema() {
        this(false);
    }

    public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
        this.includeSchema = includeSchema;
    }

    public JsonDebeziumDeserializationSchema(Boolean includeSchema, Map<String, Object> customConverterConfigs) {
        this.includeSchema = includeSchema;
        this.customConverterConfigs = customConverterConfigs;
    }

    @Override
    public void deserialize(SourceRecord record, Collector<String> out) {
        if (jsonConverter == null) {
            initializeJsonConverter();
        }
        Struct value = (Struct) record.value();
        JSONObject data = new JSONObject();
        Struct before = null;
        try {
            before = value.getStruct("before");//正常运行几天后突然抛出异常 org.apache.kafka.connect.errors.DataException: before is not a valid field name
        } catch (DataException e) {
            log.info("DataException value ------------> {}", value);
            log.info("DataException before ------------> {}", before);
            e.printStackTrace();
        }

        JSONObject beforeData = new JSONObject();
        if (before != null) {
            for (Field field : before.schema().fields()) {
                Object fieldValue = before.get(field);
                String fieldName = field.name();
                String schema = field.schema().toString();
                if ("Schema{io.debezium.time.Date:INT32}".equals(schema)) {
                    Integer int32 = before.getInt32(fieldName);
                    if (int32 != null) {
                        LocalDateTime localDateTime = LocalDateTime.ofInstant(new Date(int32 * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000L).toInstant(), ZoneId.systemDefault());
                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
                        fieldValue = localDateTime.format(formatter);
                    }
                } else if ("Schema{io.debezium.time.Timestamp:INT64}".equals(schema)) {
                    Long int64 = before.getInt64(fieldName);
                    if (int64 != null) {
                        LocalDateTime localDateTime = LocalDateTime.ofInstant(new Date(int64 - 8 * 60 * 60 * 1000L).toInstant(), ZoneId.systemDefault());
                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                        fieldValue = localDateTime.format(formatter);
                    }
                }
                beforeData.put(field.name(), fieldValue);
            }
        }
        Struct after = null;
        try {
            after = value.getStruct("after");
        } catch (DataException e) {
            log.info("DataException value ------------> {}", value);
            log.info("DataException after ------------> {}", after);
            e.printStackTrace();
        }

        JSONObject afterData = new JSONObject();
        if (after != null) {
            for (Field field : after.schema().fields()) {
                Object fieldValue = after.get(field);
                String fieldName = field.name();
                String schema = field.schema().toString();
                if ("Schema{io.debezium.time.Date:INT32}".equals(schema)) {
                    Integer int32 = after.getInt32(fieldName);
                    if (int32 != null) {
                        LocalDateTime localDateTime = LocalDateTime.ofInstant(new Date(int32 * 24 * 60 * 60 * 1000L - 8 * 60 * 60 * 1000L).toInstant(), ZoneId.systemDefault());
                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
                        fieldValue = localDateTime.format(formatter);
                    }
                } else if ("Schema{io.debezium.time.Timestamp:INT64}".equals(schema)) {
                    Long int64 = after.getInt64(fieldName);
                    if (int64 != null) {
                        Instant instant = new Date(int64 - 8 * 60 * 60 * 1000L).toInstant();
                        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                        fieldValue = localDateTime.format(formatter);
                    }
                }
                afterData.put(fieldName, fieldValue);
            }
        }

        Struct source = value.getStruct("source");
        if (source != null) {
            data.put("dbTable", source.getString("db") + "." + source.getString("table"));
        }
        data.put("before", beforeData);
        data.put("after", afterData);
        data.put("op", value.getString("op"));
        data.put("ts_ms", value.getInt64("ts_ms"));
        out.collect(data.toJSONString());
    }

    /**
     * Initialize {@link JsonConverter} with given configs.
     */
    private void initializeJsonConverter() {
        jsonConverter = new JsonConverter();
        final HashMap<String, Object> configs = new HashMap<>(2);
        configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
        configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
        configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        if (customConverterConfigs != null) {
            configs.putAll(customConverterConfigs);
        }
        jsonConverter.configure(configs);
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
